Skip to content

Conversation

@abonander
Copy link
Contributor

@abonander abonander commented Jun 6, 2025

Motivation

Because OwnedPermits can be passed around separate from the Senders they came from,
it can be useful to be able to check that a value sent on one will go to the intended receiver.

There's currently no way to get a reference to, or clone of, the Sender from an OwnedPermit without sending a message or releasing it, so calling Sender::same_channel() from an OwnedPermit is impossible.

For context, I'm trying to create an implementation of quinn::AsyncUdpSocket backed by channels to support external transports in a project.

AsyncUdpSocket::try_send() just immediately forwards to Sender::try_send() and returns WouldBlock when the channel is full or BrokenPipe if it gets closed (is that right? I can't find what error a socket with the write half closed is meant to return from sendto()). The UdpPoller returned by create_io_poller() is a wrapper around Sender::reserve_owned(), and calling .poll_writable() polls the reserve_owned function until it returns Ready.

The question then becomes: what to do with the permit? I'd like to actually use it in the next try_send() call, because otherwise a task woken by poll_writable() will immediately get a permit just to drop it, which could wake a different task waiting for the same permit. Depending on scheduling, that other task could run first and steal the space, which means at high contention, a task could theoretically get caught in an unbounded loop between poll_writable() and try_send() because it keeps losing its permit.

(As an aside, I think this is a potential flaw in the design of AsyncUdpSocket in general, but I feel like it's much more likely for an mpsc::channel() buffer to fill up and require waiting for space than the actual socket send buffer filling up unless the network interface is ridiculously slow.)

I can just store it in the structure, but by design, multiple tasks are expected to call poll_writable() and try_send() on the same instance, so unless I store the permits in a map keyed by thread ID, there's no way to guarantee that the task that got the permit in poll_writable() will be the one to use it in try_send().

It occurred to me that I should be able to just store the permit in a thread_local!(), but I realized that there's a possible bug there: if two tasks using different channels are running on the same thread and one stores a permit from poll_writable(), the other one could get the permit in .try_send() and end up sending a packet down the wrong channel.

So I realized I needed to check that the permit will actually send to the same channel. I could just store a clone of the Sender alongside the permit, but that's mildly annoying and touches the refcount of the channel, which could actually have nontrivial overhead at high contention.

Solution

I initially thought that OwnedPermit could just gain a get_sender() method, but because of its internal structure, it can't just return a reference directly. It would have to return another owned Sender which would also require touching the refcount.

While that could be a useful addition on its own, I really just needed the ability to call Sender::same_channel(), which is just a trivial pointer comparison, so it made more sense to implement as a dedicated method.

I debated just implementing the method as fn same_channel(&self, sender: &Sender<T>) but that had a potentially confusing asymmetry compared to Sender::same_channel(), so I renamed the method to same_channel_as_sender() for clarity.

For my use-case, I don't actually need OwnedPermit::same_channel(&self, other: &Self) since I'm not dealing with two OwnedPermits at the same time, but it made sense to include as a direct analogue to Sender::same_channel(). However, I'd be fine to delete it under the YAGNI principle.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jun 6, 2025
@abonander abonander force-pushed the abonander/ownedpermit-same-channel branch 2 times, most recently from a327271 to 21f8085 Compare June 6, 2025 04:25
* `OwnedPermit::same_channel()`
* `OwnedPermit::same_channel_as_sender()`
@abonander abonander changed the title sync: add OwnedPermit::same_channel() and ::same_channel_as_sender() sync: add same_channel analogue to OwnedPermit Jun 6, 2025
@abonander abonander force-pushed the abonander/ownedpermit-same-channel branch from 21f8085 to cc52c6f Compare June 6, 2025 04:28
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Jun 7, 2025
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skeptical about putting the permit in a thread-local. It might get stored there for a long time if you don't end up using it, and could be effectively a leak.

But the suggested methods look good to me, so I'll go ahead and merge this.

@Darksonn Darksonn merged commit c38de96 into tokio-rs:master Jun 7, 2025
92 checks passed
hawkw added a commit that referenced this pull request Jul 1, 2025
# 1.46.0 (July 1st, 2025)

### Fixed

- net: fixed `TcpStream::shutdown` incorrectly returning an error on macOS
  ([#7290])

## Added

- sync: `mpsc::OwnedPermit::{same_channel, same_channel_as_sender}` methods
  ([#7389])
- macros: `biased` option for `join!` and `try_join!`, similar to `select!`
  ([#7307])
- net: support for cygwin ([#7393])
- net: support `pope::OpenOptions::read_write` on Android ([#7426])
- net: add `Clone` implementation for `net::unix::SocketAddr` ([#7422])

## Changed

- runtime: eliminate unnecessary lfence while operating on `queue::Local<T>`
  ([#7340])
- task: disallow blocking in `LocalSet::{poll,drop}` ([#7372])

## Unstable

- runtime: add `TaskMeta::spawn_location` tracking where a task was spawned
  ([#7417])
- runtime: removed borrow from `LocalOptions` parameter to
  `runtime::Builder::build_local` ([#7346])

## Documented

- io: clarify behavior of seeking when `start_seek` is not used ([#7366])
- io: document cancellation safety of `AsyncWriteExt::flush` ([#7364])
- net: fix docs for `recv_buffer_size` method ([#7336])
- net: fix broken link of `RawFd` in `TcpSocket` docs ([#7416])
- net: update `AsRawFd` doc link to current Rust stdlib location ([#7429])
- readme: fix double period in reactor description (#7363)
- runtime: add doc note that `on_*_task_poll` is unstable ([#7311])
- sync: update broadcast docs on allocation failure ([#7352])
- time: add a missing panic scenario of `time::advance` ([#7394])

[#7290]: #7290
[#7307]: #7307
[#7311]: #7311
[#7336]: #7336
[#7340]: #7340
[#7346]: #7346
[#7352]: #7352
[#7364]: #7364
[#7366]: #7366
[#7372]: #7372
[#7389]: #7389
[#7393]: #7393
[#7394]: #7394
[#7416]: #7416
[#7422]: #7422
[#7426]: #7426
[#7429]: #7429
[#7417]: #7417
hawkw added a commit that referenced this pull request Jul 1, 2025
# 1.46.0 (July 1st, 2025)

### Fixed

- net: fixed `TcpStream::shutdown` incorrectly returning an error on macOS
  ([#7290])

## Added

- sync: `mpsc::OwnedPermit::{same_channel, same_channel_as_sender}` methods
  ([#7389])
- macros: `biased` option for `join!` and `try_join!`, similar to `select!`
  ([#7307])
- net: support for cygwin ([#7393])
- net: support `pope::OpenOptions::read_write` on Android ([#7426])
- net: add `Clone` implementation for `net::unix::SocketAddr` ([#7422])

## Changed

- runtime: eliminate unnecessary lfence while operating on `queue::Local<T>`
  ([#7340])
- task: disallow blocking in `LocalSet::{poll,drop}` ([#7372])

## Unstable

- runtime: add `TaskMeta::spawn_location` tracking where a task was spawned
  ([#7417])
- runtime: removed borrow from `LocalOptions` parameter to
  `runtime::Builder::build_local` ([#7346])

## Documented

- io: clarify behavior of seeking when `start_seek` is not used ([#7366])
- io: document cancellation safety of `AsyncWriteExt::flush` ([#7364])
- net: fix docs for `recv_buffer_size` method ([#7336])
- net: fix broken link of `RawFd` in `TcpSocket` docs ([#7416])
- net: update `AsRawFd` doc link to current Rust stdlib location ([#7429])
- readme: fix double period in reactor description (#7363)
- runtime: add doc note that `on_*_task_poll` is unstable ([#7311])
- sync: update broadcast docs on allocation failure ([#7352])
- time: add a missing panic scenario of `time::advance` ([#7394])

[#7290]: #7290
[#7307]: #7307
[#7311]: #7311
[#7336]: #7336
[#7340]: #7340
[#7346]: #7346
[#7352]: #7352
[#7364]: #7364
[#7366]: #7366
[#7372]: #7372
[#7389]: #7389
[#7393]: #7393
[#7394]: #7394
[#7416]: #7416
[#7422]: #7422
[#7426]: #7426
[#7429]: #7429
[#7417]: #7417
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants